Spark RDD 的 Action操作

Posted by Jackson on 2017-10-19

1. reduce(func)

通过func函数聚集RDD中的所有元素,这个功能必须是可交换且可并联的

1
2
3
4
5
6
7
8
9
10
11
scala> val rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[85] at makeRDD at <console>:24

scala> rdd1.reduce(_+_)
res50: Int = 55

scala> val rdd2 = sc.makeRDD(Array(("a",1),("a",3),("c",3),("d",5)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[86] at makeRDD at <console>:24

scala> rdd2.reduce((x,y)=>(x._1 + y._1,x._2 + y._2))
res51: (String, Int) = (adca,12)

2.collect()

在驱动程序中,以数组的形式返回数据集的所有元素

3.count()

返回RDD的元素个数

4.first()

返回RDD的第一个元素(类似于take(1))

5.take(n)

返回一个由数据集的前n个元素组成的数组

6.takeSample(withReplacement,num, [seed])

返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子

7.takeOrdered(n)

返回前几个的排序

8.aggregate

(zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)
aggregate函数将每个分区里面的元素通过seqOp和初始值进行聚合,然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作。这个函数最终返回的类型不需要和RDD中元素类型一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[88] at makeRDD at <console>:24

scala> rdd1.aggregate(1)(
| {(x : Int,y : Int) => x + y},
| {(a : Int,b : Int) => a + b}
| )
res56: Int = 58

scala> rdd1.aggregate(1)(
| {(x : Int,y : Int) => x * y},
| {(a : Int,b : Int) => a + b}
| )
res57: Int = 30361

9.fold(num)(func)

折叠操作,aggregate的简化操作,seqop和combop一样。

1
2
3
4
5
6
7
8
9
10
11
scala> var rdd1 = sc.makeRDD(1 to 4,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[90] at makeRDD at <console>:24

scala> rdd1.aggregate(1)(
| {(x : Int,y : Int) => x + y},
| {(a : Int,b : Int) => a + b}
| )
res59: Int = 13

scala> rdd1.fold(1)(_+_)
res60: Int = 13

10.saveAsTextFile(path)

将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本

11.saveAsSequenceFile(path)

将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。

12.saveAsObjectFile(path)

用于将RDD中的元素序列化成对象,存储到文件中。

13.countByKey()

针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。

1
2
3
4
5
scala> val rdd = sc.parallelize(List((1,3),(1,2),(1,4),(2,3),(3,6),(3,8)),3)
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[95] at parallelize at <console>:24

scala> rdd.countByKey()
res63: scala.collection.Map[Int,Long] = Map(3 -> 2, 1 -> 3, 2 -> 1)

14.foreach(func)

在数据集的每一个元素上,运行函数func进行更新。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
scala> var rdd = sc.makeRDD(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[107] at makeRDD at <console>:24

scala> var sum = sc.accumulator(0)
warning: there were two deprecation warnings; re-run with -deprecation for details
sum: org.apache.spark.Accumulator[Int] = 0

scala> rdd.foreach(sum+=_)

scala> sum.value
res68: Int = 55

scala> rdd.collect().foreach(println)
1
2
3
4
5
6
7
8
9
10

注意:reduceByKey是转换操作,reduceByKey和groupByKey的区别是reduceByKey会对数据进行预聚合操作